[Improve](streaming-job) use per-table publication instead of ALL TABLES for PostgreSQL CDC#62526
[Improve](streaming-job) use per-table publication instead of ALL TABLES for PostgreSQL CDC#62526JNSimba wants to merge 3 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Findings
- Blocking:
publication_nameis accepted as arbitrary non-empty text in FE, but both the create/update path and the cleanup path splice it directly into SQL (fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java,fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java). A user-provided name such asMyPuborpub-nameis valid only when quoted in PostgreSQL, so this feature currently accepts values that later fail at runtime. - Blocking: auto-created-resource cleanup is detected only by prefix (
doris_pub_/doris_cdc_) infs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java. That violates the PR contract that user-provided names are preserved: if a user explicitly setspublication_name = doris_pub_sharedorslot_name = doris_cdc_shared,DROP JOBwill still delete that external resource.
Critical Checkpoints
- Goal of current task: Partially met. Happy-path per-table publication / SHOW visibility are implemented and covered, but the two blockers above break custom-name support and cleanup semantics.
- Modification size / focus: Yes, mostly focused.
- Concurrency: No new concurrency-sensitive path identified.
- Lifecycle management: Needs work. External PostgreSQL resource ownership is inferred from name prefixes instead of persisted ownership.
- Configuration items: New source properties were added, but validation is incomplete for PostgreSQL identifier rules and cleanup semantics.
- Incompatible changes: No FE/BE protocol or storage compatibility issue found.
- Parallel code paths: No matching non-PostgreSQL path appears to need the same publication logic.
- Special conditional checks: Prefix-based auto-created detection is not a safe condition.
- Test coverage: Positive regression and unit coverage exists; missing negative tests for custom identifiers and user-provided prefixed names.
- Observability: Sufficient for this change.
- Transaction / persistence: Source property persistence and replay path looks okay.
- Data writes / modifications: Unsafe external-resource deletion behavior remains.
- FE-BE variable passing: Not applicable.
- Performance: No major concern found.
- Other issues: None beyond the blockers above.
There was a problem hiding this comment.
Pull request overview
Updates Doris PostgreSQL CDC streaming jobs to use per-table PostgreSQL publications (filtered mode) and to persist/show slot/publication names via job source properties, improving WAL decoding efficiency and operational visibility.
Changes:
- Add
slot_name/publication_namesource properties (auto-generated by FE when omitted) and surface them inSHOW JOBSoutput. - Switch Debezium publication autocreation to
filteredmode and addpublish_via_partition_root=truefor PG 13+ when creating filtered publications. - Add regression + unit tests covering generated names, per-table publication contents, and cleanup on job drop.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.groovy | New regression test validating filtered publications, name visibility, and cleanup behavior. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_publication.out | Expected snapshot query outputs for the new regression test. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Configure publication name + filtered autocreation; implement slot/publication cleanup logic. |
| fs_brokers/cdc_client/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java | Add publish_via_partition_root to filtered-mode CREATE PUBLICATION on PG 13+. |
| fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java | Unit tests for default slot/publication naming helpers and validator acceptance. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Populate default slot/publication names into sourceProperties for PG jobs so they persist and show up in SHOW/TVF. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Allow slot_name and publication_name keys in streaming job source properties. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java | Add constants + helper methods to generate bounded-length default slot/publication names with UUID suffix. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
I found blocking compatibility and resource lifecycle regressions in the PostgreSQL CDC changes.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: legacy PostgreSQL CDC jobs created before this PR are no longer restart compatible. Before this change the reader never setpublication.name, so those jobs ran against the Debezium defaultdbz_publicationwithALL_TABLES. This patch now falls back todoris_pub_<jobId>, classifies the job as user managed, setspublication.autocreate.mode=disabled, and validates that the new publication exists. Replayed jobs whose persistedsourcePropertiesdo not containpublication_namewill fail initialization after upgrade, and their legacydoris_cdc_<jobId>slot will also never be cleaned up on drop.fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java: cleanup ownership is inferred from the current slot and publication names. Because these source properties remain mutable throughALTER JOB, changing either name after creation makesclose()skip cleanup and leaks the original Doris created slot or publication. Ownership needs to be persisted at creation time, or these properties need to be immutable.
Critical checkpoint conclusions:
- Goal: the PR aims to add per table PostgreSQL publications, custom slot and publication names, SHOW visibility, and cleanup. The new job happy path is covered, but the current code does not safely preserve existing jobs or resource lifecycle.
- Minimality and focus: the change stays localized, but it couples new naming, ownership inference, and cleanup without persisting compatibility or ownership state.
- Concurrency: no new lock order or heavy under lock issue stood out; the reader still uses the existing slot creation lock.
- Lifecycle: slot and publication lifecycle tracking is not reliable because it depends on current name patterns rather than persisted ownership or legacy metadata.
- Configuration items: new source properties are validated, but compatibility for already persisted jobs is incomplete.
- Incompatible changes: yes. Publication naming and default behavior changed without compatibility handling for pre existing persisted jobs.
- Parallel paths: replay or upgrade and ALTER JOB paths are not updated consistently with the create path.
- Special conditions:
isAutoGenerated()is too strong as an ownership predicate; it misses legacy Doris managed resources and any auto resource after name mutation. - Test coverage: validator unit tests and one regression test cover only the fresh job happy path; there is no replay or upgrade or ALTER and cleanup coverage for the new semantics.
- Observability: validation errors are actionable, but the cleanup path still logs only the message on failure, which leaves less detail for diagnosis.
- Transaction and persistence: no FE journal format change is needed, but persisted
sourcePropertiesfrom older jobs are not interpreted compatibly. - Data modification or external state: PostgreSQL publication and slot lifecycle can be broken or leaked.
- FE BE variable passing: not applicable.
- Performance: no material hot path issue stood out in the touched code.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
/review |
There was a problem hiding this comment.
Findings
- Blocking:
slot_nameis now treated as independently user-owned, but the implementation never proves that a user-provided slot actually exists.initialize()skips the explicitcreateSlotForGlobalStreamSplit()branch whenslot_nameis set, yet the downstream replication path still callsPostgresReplicationConnection.initReplicationSlot()and auto-creates the slot whenevergetSlotInfo()returnsINVALID. In practice,CREATE JOB ... "slot_name" = "my_slot"will silently createmy_slotif the caller forgot to pre-create it, andclose()will then preserve that Doris-created slot forever because ownership is inferred from the config flag. This breaks the new ownership contract and leaks replication slots. - Blocking: the new validation for existing publications only checks name existence plus table membership (
pg_publication_tables), but partition-table support in this PR also depends onpublish_via_partition_root = true. A user-supplied or legacy publication for a partitioned root table can satisfy the current validation and still publish changes with the leaf partition identities instead of the root identity, while Doris still buildstable.include.listfrom the configured root names. That causes partition changes to be missed even though initialization succeeds. The validation needs to checkpg_publication.pubviaroot(and for Doris-owned publications, ensure the created publication keeps that setting).
Critical Checkpoints
- Goal of current task: Partially met. The PR adds filtered publications, SHOW visibility, custom names, cleanup, and tests for several happy paths, but the two cases above still violate the new PostgreSQL resource-ownership and partition-table correctness goals.
- Modification size / focus: Yes. The change set stays focused on PostgreSQL CDC job configuration, runtime initialization, cleanup, and tests.
- Concurrency: No blocking concurrency issue found. The existing slot-creation lock is preserved, and no new lock-order risk stood out in FE.
- Lifecycle management: Not fully correct. External PostgreSQL resource ownership is now modeled per resource, but user-provided slot lifecycle is not actually enforced because missing slots are still auto-created later.
- Configuration items added: Yes.
slot_nameandpublication_nameare validated and surfaced in SHOW, but the runtime behavior is still inconsistent with the intended ownership semantics in the slot path. - Incompatible changes: No FE/BE protocol or storage compatibility issue found in the final patch set.
- Functionally parallel code paths: The relevant PostgreSQL create/alter/show/cleanup paths were updated consistently; no matching MySQL path needs the same publication logic.
- Special conditional checks: The new ownership and validation branches are conceptually reasonable, but the publication check is incomplete because it ignores
pubviarootfor partition roots. - Test coverage: Positive coverage is improved with FE unit tests and a new regression suite, but there is still no negative coverage for the missing-slot user-owned case or for partitioned tables with user-supplied publications lacking
publish_via_partition_root. - Observability: Adequate for review purposes. Error messages are generally actionable.
- Transaction / persistence modifications: No EditLog / replay issue found in the touched code paths.
- Data writes / modifications: Yes, this change touches external PostgreSQL publication and replication-slot state; the two issues above can lead to leaked external resources or silently missed partition change events.
- FE-BE variable passing: Not applicable here.
- Performance: No major hot-path performance regression found in the reviewed changes.
- Other issues: None beyond the two blockers above.
|
/review |
|
run buildall |
|
OpenCode automated review failed and did not complete. Error: Review step was skipped (possibly timeout or cancelled) Please inspect the workflow logs and rerun the review after the underlying issue is resolved. |
FE Regression Coverage ReportIncrement line coverage |
…idation
- Use per-table publication (pgoutput publish_via_partition_root=true via
Debezium fork) instead of ALL TABLES, so non-CDC tables are not streamed.
- Split ownership per-resource: slot_name / publication_name are independently
user-owned or Doris-owned. Doris only creates/drops what it owns, with
deterministic default names doris_cdc_{jobId} / doris_pub_{jobId}.
- Reject slot_name / publication_name changes in ALTER JOB (immutable).
- Validate PG identifiers (lowercase, <=63 chars) at CREATE JOB time.
- Move fail-fast resource validation (user-provided slot/pub existence,
publication coverage, auto slot active-conflict) from cdc client to FE
via StreamingJobUtils.validateSourceResources(). Covers both
StreamingInsertJob and cdc_stream TVF paths; runs once on create, not
on each BE restart.
- Regression tests for publication ownership matrix, restart-FE safety,
TVF validation cases, and PG user privilege requirements.
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
0fc7b73 to
f965076
Compare
|
/review |
|
run buildall |
There was a problem hiding this comment.
I found 2 blocking issues.
- Existing PostgreSQL streaming jobs are not restart/rolling-upgrade compatible. Jobs created before this PR persisted no
publication_name; they previously used Debezium's defaultdbz_publication, but the new fallback resolves missing values todoris_pub_{jobId}. After restart or upgrade the same job will use a different publication, may need new CREATE/ownership privileges, and can leave the original publication orphaned.SHOW JOBSalso reports the synthesized new name instead of the historical one. PostgresResourceValidator.validate()opens aJdbcClientand only closes the JDBCConnection. The backing Hikari datasource is never closed, so each CREATE JOB / PG TVF validation leaks FE-side resources.
Checkpoint conclusions:
- Goal: The PR improves new-job slot/publication ownership and cleanup, but it is not yet correct for existing PG jobs across restart/upgrade.
- Minimality: The change is reasonably focused, but the fallback publication default changes persisted-job behavior more broadly than intended.
- Concurrency: I did not find a new locking/deadlock issue; the new validation path appears to run outside FE job-manager locks.
- Lifecycle: Not correct in the validator path because the
JdbcClientis never closed. - Config items:
slot_name/publication_nameare added and CREATE-time immutability is enforced. - Compatibility: Broken for pre-existing PG jobs whose persisted config lacks
publication_name. - Parallel paths: The new validator is wired into both CREATE JOB and TVF paths; the missing path is upgrade/restart coverage for old jobs.
- Conditional checks: The ALTER immutability checks look reasonable and are documented.
- Test coverage: Good new regression coverage for new-job ownership/immutability, but missing upgrade/restart coverage and validator lifecycle coverage.
- Observability: Existing logs look sufficient; I do not see a required new metric here.
- Transaction/persistence: No new EditLog format issue found, but backward-compat handling for old persisted jobs is missing.
- Data/resource cleanup: Cleanup behavior looks correct for newly created jobs; upgraded old jobs can still leak the original publication.
- FE/BE variable passing: The new properties propagate on the touched create/TVF paths.
- Performance: No obvious hot-path regression beyond the leaked validator datasource.
- Other: None.
FE Regression Coverage ReportIncrement line coverage |
…cdc_stream TVF Move PG slot/publication validation out of CdcStreamTableValuedFunction into StreamingInsertJob.initInsertJob so standalone cdc_stream TVF queries (no enclosing job) no longer run ownership checks — the job is the unit that owns a slot/publication, not the TVF. Populate default slot/publication names in two places: - JdbcTvfSourceOffsetProvider.ensureInitialized so sourceProperties carries the resolved names; cleanMeta -> /api/close uses sourceProperties and without this step cdcclient fell back to dbz_publication on close and skipped dropping the Doris-managed publication on DROP JOB. - JdbcTvfSourceOffsetProvider.rewriteTvfParams so each task's TVF props carry the resolved names for cdcclient ownership logic at create time. Add test_cdc_stream_tvf_publication covering auto / user-provided / mixed ownership on the cdc_stream TVF path. Prune the three standalone-TVF fail-fast cases from test_cdc_stream_tvf_postgres since standalone TVF no longer validates PG resources. Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
|
/review |
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
|
OpenCode automated review failed and did not complete. Error: Review step was failure (possibly timeout or cancelled) Please inspect the workflow logs and rerun the review after the underlying issue is resolved. |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
|
/review |
There was a problem hiding this comment.
Findings:
- Legacy
cdc_streamPostgreSQL streaming jobs are not upgrade/restart compatible. Replayed jobs whose persisted SQL never containedpublication_namenow materializedoris_pub_{jobId}in the TVF path, so they reconnect with a different publication than the pre-PRdbz_publicationbehavior. - Standalone
cdc_streamwith a user-supplied missingslot_namestill auto-creates that slot inside the cdc client, butclose()now treats it as user-owned and preserves it, leaking replication slots on ad hoc queries. - The TVF-backed
CREATE JOB ... DO INSERT ... SELECT FROM cdc_stream(...)path still skips the new FE-sideslot_name/publication_nameidentifier validation, so invalid identifiers are accepted at create time and only fail later inside the cdc client. - User-managed publications for partitioned root tables are accepted without checking
pg_publication.pubviaroot, so partition changes can be silently skipped even though validation succeeds.
Checkpoint conclusions:
- Goal: Partially achieved for newly created jobs, but the current code does not fully satisfy the PR goal because the four blocking cases above still break correctness/compatibility.
- Minimality: The change set is focused, but the new default-name injection expands behavior in the legacy TVF replay path more than intended.
- Concurrency: No new FE lock-order or deadlock issue found; the new logic appears outside catalog-lock critical sections, and
SLOT_CREATION_LOCKremains localized. - Special lifecycle / replay: Not correct yet. Legacy TVF jobs and standalone TVF queries now take different ownership paths across create/replay/close, so lifecycle handling is inconsistent.
- Configuration items:
slot_name/publication_nameare added and ALTER immutability is enforced in the touched job paths, but create-time validation is still inconsistent between FROM-TO jobs and TVF-backed jobs. - Compatibility / incompatible changes: Broken for existing TVF-backed PostgreSQL jobs whose persisted SQL lacks
publication_name; after upgrade/restart they reconnect with a different publication. - Parallel code paths: FROM-TO and TVF-backed create paths are not fully aligned; only the FROM-TO path gets
DataSourceConfigValidator.validateSource(), and only Doris-managed publications get the partition-root invariant. - Conditional checks: The publication coverage check is insufficient for partition roots; table-name coverage alone does not prove correctness.
- Test coverage: Good new coverage for new-job ownership/cleanup, but missing legacy TVF upgrade/restart coverage, invalid-identifier coverage for TVF-backed CREATE JOB, standalone missing-slot leakage coverage, and user-managed partition-publication coverage.
- Observability: Existing logs look sufficient; I do not see a required new metric.
- Transaction / persistence: No new EditLog-format issue found, but persisted
executeSqlreplay for old TVF jobs lacks backward-compat handling. - Data writes / modifications: The missing
pubviarootvalidation can silently drop change events from partitioned tables, so data correctness is not fully guaranteed. - FE/BE variable passing: The new properties propagate through several paths, but validation and replay semantics are still inconsistent across those paths.
- Performance: No material hot-path regression found beyond the correctness problems above.
- Other issues: None beyond the four blocking items above.
| List<String> tables = Collections.singletonList( | ||
| originTvfProps.get(DataSourceConfigKeys.TABLE)); | ||
| Map<String, String> effective = new HashMap<>(originTvfProps); | ||
| populateDefaultSourceProperties(sourceType, effective, jobId); |
There was a problem hiding this comment.
Blocking: the CREATE JOB ... DO INSERT ... SELECT FROM cdc_stream(...) path still bypasses the new FE identifier validation.
CreateJobCommand.validate() only calls DataSourceConfigValidator.validateSource() for FROM POSTGRES jobs. Here validateTvfSource() goes straight to PostgresResourceValidator, so slot_name / publication_name values like Bad-Name or pub;drop are accepted in the TVF-backed job path even though the PR description says CREATE-time validation now covers both paths. Those raw values later reach the cdc client DDL and fail only at runtime.
Please run the same DataSourceConfigValidator.validateSource() checks on the TVF props before PostgresResourceValidator.validate().
FE Regression Coverage ReportIncrement line coverage |
|
run nonConcurrent |
FE Regression Coverage ReportIncrement line coverage |
What problem does this PR solve?
Problem Summary:
Previously, PostgreSQL CDC streaming jobs created publications using
FOR ALL TABLES, which monitors all tables in the database regardless of which tables the job actually needs to capture. This causes unnecessary WAL decoding overhead and network traffic. Additionally, users could not specify customslot_nameorpublication_name, and these values were not visible inSHOW JOBS.This PR makes the following improvements:
FOR TABLE table1, table2instead ofFOR ALL TABLES, only including tables specified ininclude_tables(ortablefor the cdc_stream TVF path).slot_nameandpublication_namein source properties. If not specified, auto-generated namesdoris_cdc_{jobId}/doris_pub_{jobId}are used.CREATE JOB ... FROM POSTGRES TOpath, auto-generated or user-provided names are persisted insourcePropertiesand visible viaSHOW JOBS.publish_via_partition_root = trueoption for PostgreSQL 13+ in FILTERED mode (previously only in ALL_TABLES mode).doris_cdc_{jobId}/doris_pub_{jobId}) are dropped when the job is deleted; user-provided names are preserved. Each resource is evaluated independently, so a job with a user publication and an auto slot will only drop the auto slot.sourcePropertiesinStreamingInsertJob.init.cdc_streamTVF wrapped inside a streaming INSERT job: validated inStreamingInsertJob.initInsertJobagainst a temporary copy of the TVF properties with defaults populated; the TVF properties map itself is read-only because Nereids may hand back an immutable map.cdc_streamTVF (SELECT without an enclosing job): no PG resource validation — the TVF is a point query and not the unit that owns a slot/publication. Users supplying an invalid slot/publication will get the failure from cdcclient at execution time.JdbcTvfSourceOffsetProvider.ensureInitializedandrewriteTvfParamspopulate default slot/publication names intosourcePropertiesand per-task TVF props so cdcclient ownership logic sees the resolved names both when the reader is created and when/api/closetears it down. Without theensureInitializedstep,cleanMeta -> /api/closewould send a config withoutpublication_name, cdcclient would fall back to the legacydbz_publicationon close, and DROP JOB would leave the Doris-managed publication behind.Release note
Support per-table PostgreSQL publication for streaming CDC jobs. Users can now optionally specify
slot_nameandpublication_namein source properties. Auto-generated slot/publication names use the formatdoris_cdc_{jobId}/doris_pub_{jobId}and are visible in SHOW JOBS output for the from-to path.Check List (For Author)
Test
Behavior changed:
FOR ALL TABLEStoFOR TABLE(filtered mode). Auto-generated slot/publication names now usedoris_cdc_{jobId}/doris_pub_{jobId}. Both are visible in SHOW JOBS output. Standalonecdc_streamTVF no longer performs PG slot/publication ownership validation — the check now runs only when the TVF is wrapped inside a streaming INSERT job.Does this need documentation?
Check List (For Reviewer who merge this PR)